Fork me on GitHub

spark sql窗口函数实战

参考文章

主要以一些官方文档为参考。
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics

https://help.aliyun.com/document_detail/34994.html?spm=a2c4g.11174283.6.650.6f02590e0d209m#h2-url-1

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-functions-windows.html

http://xinhstechblog.blogspot.com/2016/04/spark-window-functions-for-dataframes.html

http://cdn2.hubspot.net/hubfs/438089/notebooks/eBook/Introducing_Window_Functions_in_Spark_SQL_Notebook.html

http://blog.madhukaraphatak.com/introduction-to-spark-two-part-5/

https://www.cnblogs.com/piaolingzxh/p/5538783.html

准备数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
object WindowFunctionTest extends BaseSparkSession {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setAppName("WindowFunctionTest")
.set("spark.master", "local[*]")
val spark = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._

val df = List(
("浙江", "2018-01-01", 500),
("浙江", "2018-01-02", 450),
("浙江", "2018-01-03", 550),
("湖北", "2018-01-01", 250),
("湖北", "2018-01-02", 290),
("湖北", "2018-01-03", 270)
).toDF("site", "date", "user_cnt")
}
}

平均移动值

DataFrame API方式实现

方式一:

1
2
3
// 窗口定义从 -1(前一行)到 1(后一行),每一个滑动的窗口总用有3行
val movinAvgSpec = Window.partitionBy("site").orderBy("date").rowsBetween(-1, 1)
df.withColumn("MovingAvg", avg(df("user_cnt")).over(movinAvgSpec)).show()

方式二:

1
2
3
4
5
6
7
 val movinAvgSpec = Window.partitionBy("site").orderBy("date").rowsBetween(-1, 1)
df.select(
$"site",
$"date",
$"amount",
avg($"user_cnt").over(movinAvgSpec).as("moving_avg_user_cnt")
).show()

sql方式实现

1
2
3
4
5
6
7
8
9
df.createOrReplaceTempView("site_info")
spark.sql(
"""
|select site,
| date,
| user_cnt,
| avg(user_cnt) over(partition by site order by date rows between 1 preceding and 1 following) as moving_avg
|from site_info
""".stripMargin).show()

lag函数

说明:取当前记录的前x条数据的指定列,如果没有返回null,有就返回真实值。

DataFrame API方式实现

方式一:

1
2
val lagwSpec = Window.partitionBy("site").orderBy("date")
df.withColumn("prevUserCnt", lag(df("user_cnt"), 1).over(lagwSpec)).show()

方式二:

1
2
3
4
5
6
7
 val lagwSpec = Window.partitionBy("site").orderBy("date")
df.select(
$"site",
$"date",
$"amount",
lag($"user_cnt").over(movinAvgSpec).as("lag_user_cnt")
).show()

sql方式实现

1
2
3
4
5
6
7
8
9
df.createOrReplaceTempView("site_info")
spark.sql(
"""
|select site,
| date,
| user_cnt,
| lag(user_cnt,1) over(partition by site order by date asc ) as prevUserCnt
|from site_info
""".stripMargin).show()

lead函数

说明:取当前记录的后x条数据的指定列,如果没有返回null,有就返回真实值。

DataFrame API方式实现

方式一:

1
2
val leadwSpec = Window.partitionBy("site").orderBy("date")
df.withColumn("lead_user_cnt", lead(df("user_cnt"), 1).over(leadwSpec)).show()

方式二:

1
2
3
4
5
6
7
val leadwSpec = Window.partitionBy("site").orderBy("date")
df.select(
$"site",
$"date",
$"user_cnt",
lead($"user_cnt", 1).over(leadwSpec).as("lead_user_cnt")
).show()

sql方式实现

1
2
3
4
5
6
7
8
spark.sql(
"""
|select site,
| date,
| user_cnt,
| lead(user_cnt,1) over(partition by site order by date asc ) as lead_user_cnt
|from site_info
""".stripMargin).show()

结果

1
2
3
4
5
6
7
8
9
10
+----+----------+--------+-------------+
|site| date|user_cnt|lead_user_cnt|
+----+----------+--------+-------------+
|湖北|2018-01-01| 250| 290|
|湖北|2018-01-02| 290| 270|
|湖北|2018-01-03| 270| null|
|浙江|2018-01-01| 500| 450|
|浙江|2018-01-02| 450| 550|
|浙江|2018-01-03| 550| null|
+----+----------+--------+-------------+

FIRST_VALUE函数

说明:该函数用于获取分组排序后最第一条记录的字段值。

DataFrame API方式实现

方式一:

1
2
val firstValuewSpec = Window.partitionBy("site").orderBy("date")
df.withColumn("first_value_user_cnt", first("user_cnt").over(firstValuewSpec)).show()

方式二:

1
2
3
4
5
6
val firstValuewSpec = Window.partitionBy("site").orderBy("date")
df.select(
$"site",
$"date",
$"user_cnt",
first($"user_cnt").over(firstValuewSpec).as("first_value_user_cnt")).show()

sql方式实现

1
2
3
4
5
6
7
8
spark.sql(
"""
|select site,
| date,
| user_cnt,
| first_value(user_cnt) over(partition by site order by date asc ) as first_value_user_cnt
|from site_info
""".stripMargin).show()

结果

1
2
3
4
5
6
7
8
9
10
+----+----------+--------+--------------------+
|site| date|user_cnt|first_value_user_cnt|
+----+----------+--------+--------------------+
|湖北|2018-01-01| 250| 250|
|湖北|2018-01-02| 290| 250|
|湖北|2018-01-03| 270| 250|
|浙江|2018-01-01| 500| 500|
|浙江|2018-01-02| 450| 500|
|浙江|2018-01-03| 550| 500|
+----+----------+--------+--------------------+

LAST_VALUE函数

说明:该函数用于获取分组排序后最后一条记录的字段值。

DataFrame API方式实现

方式一:

1
2
val lastValuewSpec = Window.partitionBy("site").orderBy("date").rowsBetween(Long.MinValue, Long.MaxValue)
df.withColumn("last_value_user_cnt", last("user_cnt").over(lastValuewSpec)).show()

方式二:

1
2
3
4
5
6
val lastValuewSpec = Window.partitionBy("site").orderBy("date").rowsBetween(Long.MinValue, Long.MaxValue)
df.select(
$"site",
$"date",
$"user_cnt",
last($"user_cnt").over(lastValuewSpec).as("last_value_user_cnt")).show()

sql方式实现

1
2
3
4
5
6
7
8
spark.sql(
"""
|select site,
| date,
| user_cnt,
| last_value(user_cnt) over(partition by site order by date asc rows between unbounded preceding and unbounded following ) as last_value_user_cnt
|from site_info
""".stripMargin).show()

结果

1
2
3
4
5
6
7
8
9
10
+----+----------+--------+-------------------+
|site| date|user_cnt|last_value_user_cnt|
+----+----------+--------+-------------------+
|湖北|2018-01-01| 250| 270|
|湖北|2018-01-02| 290| 270|
|湖北|2018-01-03| 270| 270|
|浙江|2018-01-01| 500| 550|
|浙江|2018-01-02| 450| 550|
|浙江|2018-01-03| 550| 550|
+----+----------+--------+-------------------+

COUNT

说明:该函数用于计算计数值。

不指定order by

###
方式一:

1
2
val counWSpec = Window.partitionBy("site")
df.withColumn("count", count("user_cnt").over(counWSpec)).show()

方式二:

1
2
3
4
5
6
val counWSpec = Window.partitionBy("site")
df.select(
$"site",
$"date",
$"user_cnt",
count($"user_cnt").over(counWSpec).as("count")).show()

sql方式实现

1
2
3
4
5
6
7
8
spark.sql(
"""
| select site,
| date,
| user_cnt,
| count(user_cnt) over(partition by site) as count
|from site_info
""".stripMargin).show()

结果

1
2
3
4
5
6
7
8
9
10
+----+----------+--------+-----+
|site| date|user_cnt|count|
+----+----------+--------+-----+
|湖北|2018-01-01| 250| 3|
|湖北|2018-01-02| 290| 3|
|湖北|2018-01-03| 270| 3|
|浙江|2018-01-01| 500| 3|
|浙江|2018-01-02| 450| 3|
|浙江|2018-01-03| 550| 3|
+----+----------+--------+-----+

指定order by

指定order by时,返回当前窗口内从开始行到当前行的累计计数值。

DataFrame API方式实现

方式一:

1
2
val counWSpec = Window.partitionBy("site").orderBy('date.asc)
df.withColumn("count", count("user_cnt").over(counWSpec)).show()

方式二:

1
2
3
4
5
6
val counWSpec = Window.partitionBy("site").orderBy("date")
df.select(
$"site",
$"date",
$"user_cnt",
count($"user_cnt").over(counWSpec).as("count"))

sql方式实现

1
2
3
4
5
6
7
8
spark.sql(
"""
| select site,
| date,
| user_cnt,
| count(user_cnt) over(partition by site order by date) as count
|from site_info
""".stripMargin).show()

结果

1
2
3
4
5
6
7
8
9
10
+----+----------+--------+-----+
|site| date|user_cnt|count|
+----+----------+--------+-----+
|湖北|2018-01-01| 250| 1|
|湖北|2018-01-02| 290| 2|
|湖北|2018-01-03| 270| 3|
|浙江|2018-01-01| 500| 1|
|浙江|2018-01-02| 450| 2|
|浙江|2018-01-03| 550| 3|
+----+----------+--------+-----+

sum函数

说明:该函数用于计算汇总值。

不指定order by

DataFrame API方式实现

方式一:

1
2
val sumWSpec = Window.partitionBy("site")
df.withColumn("sum_user_cnt", sum("user_cnt").over(sumWSpec)).show()

方式二:

1
2
3
4
5
6
7
val sumWSpec = Window.partitionBy("site")
df.select(
$"site",
$"date",
$"user_cnt",
sum($"user_cnt").over(sumWSpec).as("sum_user_cnt")
).show()

sql方式实现

1
2
3
4
5
6
7
8
spark.sql(
"""
|select site,
| date,
| user_cnt,
| sum(user_cnt) over(partition by site ) as sum_user_cnt
|from site_info
""".stripMargin).show()

结果

1
2
3
4
5
6
7
8
9
10
+----+----------+--------+------------+
|site| date|user_cnt|sum_user_cnt|
+----+----------+--------+------------+
|湖北|2018-01-01| 250| 810|
|湖北|2018-01-02| 290| 810|
|湖北|2018-01-03| 270| 810|
|浙江|2018-01-01| 500| 1500|
|浙江|2018-01-02| 450| 1500|
|浙江|2018-01-03| 550| 1500|
+----+----------+--------+------------+

指定order by

DataFrame API方式实现

方式一:

1
2
val sumWSpec = Window.partitionBy("site").orderBy('date asc).rowsBetween(Long.MinValue, Long.MaxValue)
df.withColumn("sum_user_cnt", sum("user_cnt").over(sumWSpec))

方式二:

1
2
3
4
5
6
7
val sumWSpec = Window.partitionBy("site").orderBy('date asc).rowsBetween(Long.MinValue, Long.MaxValue)
df.select(
$"site",
$"date",
$"user_cnt",
sum($"user_cnt").over(sumWSpec).as("sum_user_cnt")
).show()

sql方式实现

1
2
3
4
5
6
7
8
spark.sql(
"""
|select site,
| date,
| user_cnt,
| sum(user_cnt) over(partition by site order by date asc rows between unbounded preceding and unbounded following ) as sum_user_cnt
|from site_info
""".stripMargin).show()

结果

1
2
3
4
5
6
7
8
9
10
+----+----------+--------+------------+
|site| date|user_cnt|sum_user_cnt|
+----+----------+--------+------------+
|湖北|2018-01-01| 250| 810|
|湖北|2018-01-02| 290| 810|
|湖北|2018-01-03| 270| 810|
|浙江|2018-01-01| 500| 1500|
|浙江|2018-01-02| 450| 1500|
|浙江|2018-01-03| 550| 1500|
+----+----------+--------+------------+

min函数

不指定order by

DataFrame API方式实现

方式一:

1
2
val minWSpec = Window.partitionBy("site")
df.withColumn("min_user_cnt", min("user_cnt").over(minWSpec)).show()

方式二:

1
2
3
4
5
6
df.select(
$"site",
$"date",
$"user_cnt",
min($"user_cnt").over(minWSpec)
).show()

sql方式实现

1
2
3
4
5
6
7
8
spark.sql(
"""
|select site,
| date,
| user_cnt,
| min(user_cnt) over(partition by site) as min_user_cnt
|from site_info
""".stripMargin).show()

结果

1
2
3
4
5
6
7
8
9
10
+----+----------+--------+----------------------------------------------------------+
|site| date|user_cnt|min(user_cnt) OVER (PARTITION BY site unspecifiedframe$())|
+----+----------+--------+----------------------------------------------------------+
|湖北|2018-01-01| 250| 250|
|湖北|2018-01-02| 290| 250|
|湖北|2018-01-03| 270| 250|
|浙江|2018-01-01| 500| 450|
|浙江|2018-01-02| 450| 450|
|浙江|2018-01-03| 550| 450|
+----+----------+--------+----------------------------------------------------------+

指定order by

方式一:

1
2
val minWSpec = Window.partitionBy("site").orderBy('date asc).rowsBetween(Long.MinValue, Long.MaxValue)
df.withColumn("min_user_cnt", min("user_cnt").over(minWSpec)).show()

方式二:

1
2
3
4
5
6
7
val minWSpec = Window.partitionBy("site").orderBy('date asc).rowsBetween(Long.MinValue, Long.MaxValue)
df.select(
$"site",
$"date",
$"user_cnt",
min($"user_cnt").over(minWSpec)
)

sql方式实现

1
2
3
4
5
6
7
8
spark.sql(
"""
|select site,
| date,
| user_cnt,
| min(user_cnt) over(partition by site order by date asc rows between unbounded preceding and unbounded following) as min_user_cnt
|from site_info
""".stripMargin).show()

结果

1
2
3
4
5
6
7
8
9
10
+----+----------+--------+----------------------------------------------------------+
|site| date|user_cnt|min(user_cnt) OVER (PARTITION BY site unspecifiedframe$())|
+----+----------+--------+----------------------------------------------------------+
|湖北|2018-01-01| 250| 250|
|湖北|2018-01-02| 290| 250|
|湖北|2018-01-03| 270| 250|
|浙江|2018-01-01| 500| 450|
|浙江|2018-01-02| 450| 450|
|浙江|2018-01-03| 550| 450|
+----+----------+--------+----------------------------------------------------------+

max函数

不指定order by

DataFrame API方式实现

方式一:

1
2
val maxWSpec = Window.partitionBy("site")
df.withColumn("min_user_cnt", max("user_cnt").over(maxWSpec))

方式二:

1
2
3
4
5
6
7
val maxWSpec = Window.partitionBy("site")
df.select(
$"site",
$"date",
$"user_cnt",
max($"user_cnt").over(maxWSpec).as("max_user_cnt")
)

sql方式实现

1
2
3
4
5
6
7
8
spark.sql(
"""
|select site,
| date,
| user_cnt,
| max(user_cnt) over(partition by site ) as min_user_cnt
|from site_info
""".stripMargin).show()

结果

1
2
3
4
5
6
7
8
9
10
+----+----------+--------+------------+
|site| date|user_cnt|min_user_cnt|
+----+----------+--------+------------+
|湖北|2018-01-01| 250| 290|
|湖北|2018-01-02| 290| 290|
|湖北|2018-01-03| 270| 290|
|浙江|2018-01-01| 500| 550|
|浙江|2018-01-02| 450| 550|
|浙江|2018-01-03| 550| 550|
+----+----------+--------+------------+

指定order by

DataFrame API方式实现

方式一:

1
2
val maxWSpec = Window.partitionBy("site").orderBy('date asc).rowsBetween(Long.MinValue, Long.MaxValue)
df.withColumn("min_user_cnt", max("user_cnt").over(maxWSpec)).show()

方式二:

1
2
3
4
5
6
7
val maxWSpec = Window.partitionBy("site").orderBy('date asc).rowsBetween(Long.MinValue, Long.MaxValue)
df.select(
$"site",
$"date",
$"user_cnt",
max($"user_cnt").over(maxWSpec).as("max_user_cnt")
).show()

sql方式实现

1
2
3
4
5
6
7
8
spark.sql(
"""
|select site,
| date,
| user_cnt,
| max(user_cnt) over(partition by site order by date asc rows between unbounded preceding and unbounded following) as min_user_cnt
|from site_info
""".stripMargin).show()

结果

1
2
3
4
5
6
7
8
9
10
+----+----------+--------+------------+
|site| date|user_cnt|min_user_cnt|
+----+----------+--------+------------+
|湖北|2018-01-01| 250| 290|
|湖北|2018-01-02| 290| 290|
|湖北|2018-01-03| 270| 290|
|浙江|2018-01-01| 500| 550|
|浙江|2018-01-02| 450| 550|
|浙江|2018-01-03| 550| 550|
+----+----------+--------+------------+

avg函数

说明:该函数用于计算平均值。

不指定order by

DataFrame API方式实现

方式一:

1
2
val avgWSpec = Window.partitionBy("site")
df.withColumn("avg_user_cnt", avg("user_cnt").over(avgWSpec)).show()

方式二:

1
2
3
4
5
6
7
val avgWSpec = Window.partitionBy("site")
df.select(
$"site",
$"date",
$"user_cnt",
avg("user_cnt").over(avgWSpec).as("avg_user_cnt")
).show()

sql方式实现

1
2
3
4
5
6
7
8
spark.sql(
"""
|select site,
| date,
| user_cnt,
| avg(user_cnt) over(partition by site ) as avg_user_cnt
|from site_info
""".stripMargin).show()

结果

1
2
3
4
5
6
7
8
9
10
+----+----------+--------+------------+
|site| date|user_cnt|avg_user_cnt|
+----+----------+--------+------------+
|湖北|2018-01-01| 250| 270.0|
|湖北|2018-01-02| 290| 270.0|
|湖北|2018-01-03| 270| 270.0|
|浙江|2018-01-01| 500| 500.0|
|浙江|2018-01-02| 450| 500.0|
|浙江|2018-01-03| 550| 500.0|
+----+----------+--------+------------+

指定order by

DataFrame API方式实现

方式一:

1
2
val avgWSpec = Window.partitionBy("site").orderBy("date").rowsBetween(Long.MinValue, Long.MaxValue)
df.withColumn("avg_user_cnt", avg("user_cnt").over(avgWSpec)).show()

方式二:

1
2
3
4
5
6
7
val avgWSpec = Window.partitionBy("site").orderBy("date").rowsBetween(Long.MinValue, Long.MaxValue)
df.select(
$"site",
$"date",
$"user_cnt",
avg("user_cnt").over(avgWSpec).as("avg_user_cnt")
)

sql方式实现

1
2
3
4
5
6
7
8
spark.sql(
"""
|select site,
| date,
| user_cnt,
| avg(user_cnt) over(partition by site order by date asc rows between unbounded preceding and unbounded following ) as avg_user_cnt
|from site_info
""".stripMargin).show()

结果

1
2
3
4
5
6
7
8
9
10
+----+----------+--------+------------+
|site| date|user_cnt|avg_user_cnt|
+----+----------+--------+------------+
|湖北|2018-01-01| 250| 270.0|
|湖北|2018-01-02| 290| 270.0|
|湖北|2018-01-03| 270| 270.0|
|浙江|2018-01-01| 500| 500.0|
|浙江|2018-01-02| 450| 500.0|
|浙江|2018-01-03| 550| 500.0|
+----+----------+--------+------------+

rank函数

说明:该函数用于计算排名。

DataFrame API方式实现

方式一:

1
2
val rankWSpec = Window.partitionBy("site").orderBy('user_cnt.desc)
df.withColumn("rank", rank().over(rankWSpec))

方式二:

1
2
3
4
5
6
7
val rankWSpec = Window.partitionBy("site").orderBy('user_cnt.desc)
df.select(
$"site",
$"date",
$"user_cnt",
rank().over(rankWSpec).as("rank")
).show()

sql方式实现

1
2
3
4
5
6
7
8
spark.sql(
"""
|select site,
| date,
| user_cnt,
| rank() over(partition by site order by user_cnt desc) as rank_user_cnt
|from site_info
""".stripMargin).show()

结果

1
2


row_number over 函数

DataFrame API方式实现

方式一:

1
2
val rowNUmberWSpec = Window.partitionBy("site").orderBy('date desc, 'user_cnt desc)
df.withColumn("row_num", row_number().over(rowNUmberWSpec)).show()

方式二:

1
2
3
4
5
6
7
val rowNUmberWSpec = Window.partitionBy("site").orderBy('date desc, 'user_cnt desc)
df.select(
$"site",
$"date",
$"user_cnt",
row_number().over(rowNUmberWSpec).as("row_num")
).show()

sql方式实现

1
2
3
4
5
6
7
8
9
spark.sql(
"""
|select site,
| date,
| user_cnt,
| row_number() over(partition by site order by date desc , user_cnt desc ) as row_num
|from site_info
|
""".stripMargin)

结果

1
2
3
4
5
6
7
8
9
10
+----+----------+--------+-------+
|site| date|user_cnt|row_num|
+----+----------+--------+-------+
|湖北|2018-01-03| 270| 1|
|湖北|2018-01-02| 290| 2|
|湖北|2018-01-01| 250| 3|
|浙江|2018-01-03| 550| 1|
|浙江|2018-01-02| 450| 2|
|浙江|2018-01-01| 500| 3|
+----+----------+--------+-------+

dense_rank函数

说明:该函数用于计算连续排名。

DataFrame API方式实现

方式一:

1
2
val denseRankWSpec = Window.partitionBy("site").orderBy('date asc)
df.withColumn("dense_rank", dense_rank() over (denseRankWSpec)).show()

方式二:

1
2
3
4
5
6
7
val denseRankWSpec = Window.partitionBy("site").orderBy('date asc)
df.select(
$"site",
$"date",
$"user_cnt",
dense_rank().over(denseRankWSpec).as("dense_rank")
).show()

sql方式实现

1
2
3
4
5
6
7
8
9
  spark.sql(
"""
|select site,
| date,
| user_cnt,
| dense_rank() over(partition by site order by date asc ) as dense_rank
|from site_info
|
""".stripMargin).show()

结果

1
2
3
4
5
6
7
8
9
10
+----+----------+--------+----------+
|site| date|user_cnt|dense_rank|
+----+----------+--------+----------+
|湖北|2018-01-01| 250| 1|
|湖北|2018-01-02| 290| 2|
|湖北|2018-01-03| 270| 3|
|浙江|2018-01-01| 500| 1|
|浙江|2018-01-02| 450| 2|
|浙江|2018-01-03| 550| 3|
+----+----------+--------+----------+

percent_rank函数

说明:该函数用于计算一组数据中某行的相对排名。

DataFrame API方式实现

方式一:

1
2
val percentRankWSpec = Window.partitionBy("site").orderBy('date asc)
df.withColumn("percent_rank", percent_rank() over (percentRankWSpec)).show()

方式二:

1
2
3
4
5
6
7
val percentRankWSpec = Window.partitionBy("site").orderBy('date asc)
df.select(
$"site",
$"date",
$"user_cnt",
percent_rank().over(percentRankWSpec).as("percent_rank")
).show()

sql方式实现

1
2
3
4
5
6
7
8
9
  spark.sql(
"""
|select site,
| date,
| user_cnt,
| percent_rank() over(partition by site order by date asc ) as percent_rank
|from site_info
|
""".stripMargin).show()

结果

1
2
3
4
5
6
7
8
9
|site|      date|user_cnt|percent_rank|
+----+----------+--------+------------+
|湖北|2018-01-01| 250| 0.0|
|湖北|2018-01-02| 290| 0.5|
|湖北|2018-01-03| 270| 1.0|
|浙江|2018-01-01| 500| 0.0|
|浙江|2018-01-02| 450| 0.5|
|浙江|2018-01-03| 550| 1.0|
+----+----------+--------+------------+

ntile函数

说明:用于将分组数据按照顺序切分成n片,并返回当前切片值,如果切片不均匀,默认增加第一个切片的分布。

DataFrame API方式实现

方式一:

1
2
val ntileRankWSpec = Window.partitionBy("site").orderBy('date asc)
df.withColumn("ntile", ntile(2).over(ntileRankWSpec)).show()

方式二:

1
2
3
4
5
6
7
val ntileRankWSpec = Window.partitionBy("site").orderBy('date asc)
df.select(
$"site",
$"date",
$"user_cnt",
ntile(2).over(ntileRankWSpec).as("ntile")
)

sql方式实现

1
2
3
4
5
6
7
8
spark.sql(
"""
|select site,
| date,
| user_cnt,
| ntile(2) over(partition by site order by date) as ntile
|from site_info
""".stripMargin).show()

结果

1
2
3
4
5
6
7
8
9
10
+----+----------+--------+-----+
|site| date|user_cnt|ntile|
+----+----------+--------+-----+
|湖北|2018-01-01| 250| 1|
|湖北|2018-01-02| 290| 1|
|湖北|2018-01-03| 270| 2|
|浙江|2018-01-01| 500| 1|
|浙江|2018-01-02| 450| 1|
|浙江|2018-01-03| 550| 2|
+----+----------+--------+-----+

致谢!

本人能力有限,博客错误难免,有错往将错误发送到邮箱(t_spider@aliyun.com)

本文标题:spark sql窗口函数实战

文章作者:tang

发布时间:2019年04月15日 - 16:04

最后更新:2019年04月15日 - 16:04

原始链接:https://tgluon.github.io/2019/04/15/spark sql窗口函数实战/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

-------------本文结束感谢您的阅读-------------